Перейти к основному содержимому

6.05. Интеграция развернутой ИИ-модели в Python-приложение

Всем

Интеграция развернутой ИИ-модели в Python-приложение

Интеграция модели в код требует проектирования надёжного клиентского слоя, обработки граничных случаев и соответствия архитектурным требованиям приложения. Ниже представлены проверенные подходы для промышленной эксплуатации.

Паттерны интеграции

1. Адаптер с единой точкой входа

Создайте класс-адаптер, инкапсулирующий детали взаимодействия с моделью. Это обеспечивает заменяемость бэкенда без изменения бизнес-логики.

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
import httpx
import backoff

@dataclass
class ModelResponse:
text: str
tokens_used: int
generation_time_ms: float
model_version: str

class AIModelAdapter(ABC):
@abstractmethod
async def generate(self, prompt: str, **kwargs) -> ModelResponse:
pass

class OpenAICompatibleAdapter(AIModelAdapter):
def __init__(
self,
base_url: str,
api_key: str,
model_name: str,
timeout: float = 30.0
):
self.client = httpx.AsyncClient(
base_url=base_url,
headers={"Authorization": f"Bearer {api_key}"},
timeout=timeout
)
self.model_name = model_name

@backoff.on_exception(
backoff.expo,
(httpx.TimeoutException, httpx.HTTPStatusError),
max_tries=3
)
async def generate(self, prompt: str, **kwargs) -> ModelResponse:
payload = {
"model": self.model_name,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": kwargs.get("max_tokens", 512),
"temperature": kwargs.get("temperature", 0.7),
"stream": False
}

response = await self.client.post("/v1/chat/completions", json=payload)
response.raise_for_status()
data = response.json()

choice = data["choices"][0]
usage = data["usage"]

return ModelResponse(
text=choice["message"]["content"],
tokens_used=usage["total_tokens"],
generation_time_ms=response.elapsed.total_seconds() * 1000,
model_version=data.get("model", self.model_name)
)

async def close(self):
await self.client.aclose()

2. Синхронный клиент для легаси-кода

Для интеграции в синхронные приложения используйте requests с таймаутами и повторными попытками через urllib3.util.retry.

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class SyncModelClient:
def __init__(self, base_url: str, api_key: str):
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
})
self.base_url = base_url

def generate(self, prompt: str, timeout: float = 15.0) -> Dict[str, Any]:
try:
response = self.session.post(
f"{self.base_url}/v1/chat/completions",
json={
"model": "custom-model",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 256
},
timeout=timeout
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except requests.exceptions.Timeout:
raise TimeoutError(f"Model inference exceeded {timeout}s")
except requests.exceptions.RequestException as e:
raise ConnectionError(f"Model API error: {e}")

Обработка ошибок и отказоустойчивость

Реализуйте стратегии для типовых сценариев отказа:

СценарийМеханизм обработкиПример реализации
Таймаут генерацииТаймаут на уровне клиента + circuit breakerhttpx.Timeout, pybreaker
Перегрузка моделиОграничение скорости на клиентеlimits + backoff
Некорректный промптВалидация до отправкиpydantic для структурированных входов
Отказ инстансаФолловер на резервный эндпоинтСписок базовых URL в конфигурации
Выброс памяти (OOM)Автоматическое снижение max_tokensПовторный запрос с уменьшенными лимитами

Пример использования circuit breaker:

import pybreaker

class ResilientModelClient:
def __init__(self, base_url: str, api_key: str):
self.breaker = pybreaker.CircuitBreaker(
fail_max=5,
reset_timeout=60000 # 60 секунд
)
self.adapter = OpenAICompatibleAdapter(base_url, api_key, "my-model")

@pybreaker.CircuitBreakerError
def _call_model(self, prompt: str) -> ModelResponse:
return await self.adapter.generate(prompt)

async def generate_with_fallback(self, prompt: str) -> ModelResponse:
try:
return await self._call_model(prompt)
except pybreaker.CircuitBreakerError:
# Переключение на резервную модель меньшего размера
fallback = OpenAICompatibleAdapter(
base_url="https://backup.example.com",
api_key="backup-key",
model_name="my-model-small"
)
return await fallback.generate(prompt)

Асинхронная интеграция с очередями

Для высоконагруженных систем используйте асинхронную обработку через очереди (Redis, RabbitMQ):

import asyncio
import aioredis
from pydantic import BaseModel

class InferenceRequest(BaseModel):
request_id: str
prompt: str
user_id: str
priority: int = 5 # 1-10

class AsyncInferenceQueue:
def __init__(self, redis_url: str):
self.redis = aioredis.from_url(redis_url)
self.queue_key = "model:inference:queue"

async def enqueue(self, request: InferenceRequest):
payload = request.json()
# Приоритет через sorted set: score = priority
await self.redis.zadd(self.queue_key, {payload: request.priority})

async def worker(self, model_adapter: AIModelAdapter):
while True:
# Извлечение задачи с наивысшим приоритетом (минимальный score)
task = await self.redis.zpopmin(self.queue_key)
if task:
request = InferenceRequest.parse_raw(task[0][0])
try:
response = await model_adapter.generate(request.prompt)
await self._publish_result(request.request_id, response)
except Exception as e:
await self._publish_error(request.request_id, str(e))
await asyncio.sleep(0.01) # Предотвращение busy-wait

async def _publish_result(self, request_id: str, response: ModelResponse):
await self.redis.setex(
f"result:{request_id}",
3600, # TTL 1 час
response.json()
)

Кэширование семантически эквивалентных запросов

Для снижения нагрузки на модель реализуйте кэширование на основе хэша промпта:

import hashlib
from typing import Optional

class CachedModelClient:
def __init__(self, adapter: AIModelAdapter, redis_client: aioredis.Redis):
self.adapter = adapter
self.redis = redis_client
self.cache_ttl = 86400 # 24 часа

def _prompt_hash(self, prompt: str, params: Dict[str, Any]) -> str:
# Сериализация параметров для воспроизводимости хэша
param_str = "|".join(f"{k}={v}" for k, v in sorted(params.items()))
combined = f"{prompt}|{param_str}"
return hashlib.sha256(combined.encode()).hexdigest()

async def generate(self, prompt: str, **kwargs) -> ModelResponse:
cache_key = f"model:cache:{self._prompt_hash(prompt, kwargs)}"
cached = await self.redis.get(cache_key)

if cached:
return ModelResponse.parse_raw(cached)

response = await self.adapter.generate(prompt, **kwargs)
await self.redis.setex(cache_key, self.cache_ttl, response.json())
return response

Тестирование интеграции

Покройте интеграционные сценарии:

import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_model_adapter_success():
mock_response = {
"choices": [{"message": {"content": "Тестовый ответ"}}],
"usage": {"total_tokens": 42},
"model": "test-model"
}

with patch("httpx.AsyncClient.post") as mock_post:
mock_post.return_value = AsyncMock(
status_code=200,
json=lambda: mock_response,
elapsed=type("obj", (), {"total_seconds": lambda: 0.123})()
)

adapter = OpenAICompatibleAdapter(
base_url="http://test",
api_key="test",
model_name="test-model"
)
result = await adapter.generate("Тестовый промпт")

assert result.text == "Тестовый ответ"
assert result.tokens_used == 42

Рекомендации по эксплуатации

  1. Валидация входных данных: Перед отправкой в модель проверяйте длину промпта и кодировку символов. Для русскоязычных моделей убедитесь в поддержке UTF-8.
  2. Лимитирование ресурсов: Устанавливайте max_tokens в зависимости от доступной памяти. Для 4-bit квантованной модели 7B параметров безопасный лимит — 2048 токенов на запрос.
  3. Логирование для аудита: Сохраняйте хэши промптов (не содержимое) и метаданные запросов для анализа использования без нарушения конфиденциальности.
  4. Грейсфул-деградация: При недоступности модели возвращайте структурированную ошибку с кодом 503 и рекомендацией повторить запрос через интервал.

Интеграция модели в код требует проектирования как клиентской, так и серверной частей с едиными контрактами. Использование адаптеров и стратегий отказоустойчивости позволяет изолировать бизнес-логику от изменений в инфраструктуре вывода модели.